package com.yiidata.logback.amqp;


import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import ch.qos.logback.core.Layout;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
 *
 *
 * Logback AMQP ( rabbitmq) 日志发送端
 *
 * <pre>
 *
 * Created by zhenqin.
 * User: zhenqin
 * Date: 2021/6/24
 * Time: 14:15
 * Vendor: yiidata.com
 *
 * </pre>
 *
 * @author zhenqin
 */
@Setter
@Getter
public class AmqpAppender extends AppenderBase<ILoggingEvent> {

    /**
     * The Layout used to format the log message body.
     */
    private Layout<ILoggingEvent> layout;

    /**
     * AMQP server connection parameters
     */
    String host = "localhost";
    int port = 5672;
    String username;
    String password;

    String virtualHost = "/";

    /**
     * exchangeType
     */
    BuiltinExchangeType exchangeType = BuiltinExchangeType.DIRECT;

    /**
     * exchangeName
     */
    String exchangeName;

    /**
     * 发送的队列
     */
    String routingKey;

    /**
     * 是否按照 routingKey 区分不同警告
     */
    boolean useRoutingKeyLevel = false;


    /**
     * durable
     */
    boolean durable = false;


    /**
     * autoDelete
     */
    boolean autoDelete = false;

    /**
     * 默认编码字符集
     */
    Charset defaultCharset = StandardCharsets.UTF_8;


    ConnectionFactory factory;

    /**
     * Connection and Channel properties that will be
     * initialized using the above provided properties.
     */
    private Connection connection;

    private Channel channel;


    /**
     * logback 日志框架加载完成时调用
     */
    @Override
    public void start() {
        // Create a Connection and Channel to the AMQP server
        // using supplied properties.
        final ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(this.host);
        factory.setUsername(this.username);
        factory.setPassword(this.password);
        factory.setVirtualHost(this.virtualHost);
        factory.setPort(this.port);
        factory.setRequestedHeartbeat(0);
        setFactory(factory);

        // 启动时如果有报错，需要跑出来
        reconnect();
        super.start();
    }


    /**
     * 支持重新连接
     */
    public synchronized void reconnect() {
        try {
            this.connection = factory.newConnection();
            this.channel = this.connection.createChannel();
            // exchangeDeclare() is idempotent. We use a "topic"
            // exchange for pub-sub style messaging.
            channel.exchangeDeclare(exchangeName, exchangeType, durable, autoDelete, null);//   不同类型
        } catch (final Exception e) {
            throw new IllegalStateException(e);
        }
    }


    /**
     * 关闭链接
     */
    public synchronized void close() {
        try {
            if(this.channel != null) {
                this.channel.close();
            }
        } catch (Exception ignored) {
        }
        try {
            if(this.connection != null) {
                this.connection.close();
            }
        } catch (Exception ignored) {
        }
    }


    @Override
    public void stop() {
        close();
        super.stop();
    }

    @Override
    protected void append(final ILoggingEvent event) {
        // Add useful information about the logging event
        // to AMQP message headers.
        Map<String, Object> headers = new HashMap<String, Object>();
        headers.put("context", event.getLoggerContextVO().getName());
        headers.put("level", event.getLevel().levelStr);
        headers.put("timestamp", event.getTimeStamp());
        headers.put("loggerName", event.getLoggerName());
        headers.put("threadName", event.getThreadName());
        //headers.put("message", event.getMessage());

        // Build a publishing key using the logging context and log level.
        // Logging context usually corresponds to application name.
        final Level level = event.getLevel();
        final String queueName = useRoutingKeyLevel ? routingKey + "_" + level.levelStr : routingKey;

        // Use the provided layout to format the logging event
        // and set that as the AMQP message payload.
        String message = this.layout.doLayout(event);
        final byte[] payload = message.getBytes(defaultCharset);

        BasicProperties props = new AMQP.BasicProperties.Builder()
                .contentType("text/plain")
                .deliveryMode(2)
                .priority(0)
                .headers(headers)
                .build();
        try {
            // Publish the log data.
            this.channel.basicPublish(this.exchangeName, queueName, props, payload);
        } catch (final IOException e) {
            e.printStackTrace();
            stop();
            for (int i = 0; ; i++) {
                if(i >= 3) {
                    break;
                }
                try {
                    reconnect();
                    this.channel.basicPublish(this.exchangeName, queueName, props, payload);
                    return;
                } catch (IOException ioException) {
                    e.printStackTrace();
                    continue;
                }
            }
        }
    }

    public void setCharset(String charset) {
        this.defaultCharset = Charset.forName(charset);
    }

    public void setExchangeType(String exchangeType) {
        final String exchangeTypeUp = Optional.ofNullable(StringUtils.trimToNull(exchangeType)).orElse("DIRECT").toUpperCase();
        switch (exchangeTypeUp) {
            case "DIRECT":
                this.exchangeType = BuiltinExchangeType.DIRECT;
                break;
            case "FANOUT":
                this.exchangeType = BuiltinExchangeType.FANOUT;
                break;
            case "TOPIC":
                this.exchangeType = BuiltinExchangeType.TOPIC;
                break;
            case "HEADERS":
                this.exchangeType = BuiltinExchangeType.HEADERS;
                break;
            default:
                throw new IllegalArgumentException("unknown exchangeType: " + exchangeType);
        }

    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder();
        sb.append("host: ");
        sb.append(this.host);
        sb.append(", port: ");
        sb.append(this.port);
        sb.append(", virtualHost: ");
        sb.append(this.virtualHost);
        sb.append(", username: ");
        sb.append(this.username);
        sb.append(", exchangeName: ");
        sb.append(this.exchangeName);
        sb.append(", routingKey: ");
        sb.append(this.routingKey);
        return sb.toString();
    }
}
